python 多线程、多进程

导读

真正的并行执行多任务只能在多核CPU上实现,但是,由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行

对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。

有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)当然,真正地同时执行多线程需要多核CPU才可能实现

我们前面编写的所有的Python程序,都是执行单任务的进程,也就是只有一个线程。

线程是最小的执行单元,而进程由至少一个线程组成。如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间。
多进程和多线程的程序涉及到同步、数据共享的问题,编写起来更复杂。

多线程

添加线程

# 导入模块
import threading

# 获取已激活的线程数
threading.active_count()

# 查看所有线程信息
threading.enumerate()

# 查看现在正在运行的线程
threading.current_thread()


# 添加线程,threading.Thread()接收参数target代表这个线程要完成的任务
def thread_job():
print('This is a thread of %s' % threading.current_thread())

def main():
thread = threading.Thread(target=thread_job,) # 定义线程
thread.start() # 让线程开始工作

if __name__ == '__main__':
main()

join功能

使用join对控制多个线程的执行顺序非常关键。join功能是等待调用线程完成再继续下面的操作。推荐如下这种1221的V型排布。
示例代码

def T1_job():
print("T1 start\n")
for i in range(10):
time.sleep(0.1)
print("T1 finish\n")

def T2_job():
print("T2 start\n")
print("T2 finish\n")

thread_1 = threading.Thread(target=T1_job, name='T1')
thread_2 = threading.Thread(target=T2_job, name='T2')

----------------------------------------------------------------------

thread_1.start() # start T1
thread_2.start() # start T2
thread_2.join() # join for T2
thread_1.join() # join for T1
print("all done\n")

"""
T1 start
T2 start
T2 finish
T1 finish
all done
"""

setDaemon功能

setDaemon()方法。主线程A中,创建了子线程B,并且在主线程A中调用了B.setDaemon(),这个的意思是,把主线程A设置为守护线程,这时候,要是主线程A执行结束了,就不管子线程B是否完成,一并和主线程A退出.这就是setDaemon方法的含义,这基本和join是相反的。
示例代码

import threading 
import time 

class MyThread(threading.Thread): 
        def __init__(self,id): 
                threading.Thread.__init__(self) 
        def run(self): 
                time.sleep(5) 
                print "This is " + self.getName() 
    
if __name__ == "__main__": 
        t1=MyThread(999) 
        t1.setDaemon(True) 
        t1.start() 
        print "I am the father thread." 

"""
I am the father thread.
"""
# 可以看出,子线程t1中的内容并未打出。

线程锁Lock

lock在不同线程使用同一共享内存时,能够确保线程之间互不影响,使用lock的方法是, 在每个线程执行运算修改共享内存之前,执行lock.acquire()将共享内存上锁, 确保当前线程执行时,内存不会被其他线程访问,执行运算完毕后,使用lock.release()将锁打开, 保证其他的线程可以使用该共享内存。
示例代码

import threading

def job1():
global A,lock
lock.acquire()
for i in range(10):
A+=1
print('job1',A)
lock.release()

def job2():
global A,lock
lock.acquire()
for i in range(10):
A+=10
print('job2',A)
lock.release()

if __name__== '__main__':
lock=threading.Lock()
A=0
t1=threading.Thread(target=job1)
t2=threading.Thread(target=job2)
t1.start()
t2.start()
t1.join()
t2.join()

"""
job1 1
job1 2
job1 3
job1 4
job1 5
job1 6
job1 7
job1 8
job1 9
job1 10
job2 20
job2 30
job2 40
job2 50
job2 60
job2 70
job2 80
job2 90
job2 100
job2 110
"""

其他常用操作

import threading

# 队列
from queue import Queue

def job(i, q):
print("i'm thread {} ".format(i))
i = i**2
# 将结果存入队列中,因为多线程函数不能有return
q.put(i)


q =Queue()
threads = []
for i in range(4):
t = threading.Thread(target=job,args=(i, q))
t.start()
# 将进程加入列表
threads.append(t)

# 逐个join
for thread in threads:
thread.join()

# 得到队列中元素,result
# 测试后发现结果顺序有一定概率不准确。。。
result = []
for _ in range(4):
result.append(q.get())
print(result)

GIL

Python 的设计上, 有一个必要的环节, 就是 Global Interpreter Lock (GIL). 这个东西让 Python 还是一次性只能处理一个东西.
GIL最大的问题就是Python的多线程程序并不能利用多核CPU的优势 (比如一个使用了多个线程的计算密集型程序只会在一个单CPU上面运行)。

所以Python的多线程就是假的。。。。

多进程

Python提供多进程的原因很简单, 就是用来弥补 threading 的一些劣势, 比如在 GIL.

添加进程

示例代码

# 多进程
import multiprocessing as mp

def job(a,d):
print('aaaaa')
print(mp.current_process())

if __name__=='__main__':
p1 = mp.Process(target=job,args=(1,2))
print(mp.current_process())
p1.start()
p1.join()

进程池

进程池就是我们将所要运行的东西,放到池子里,Python会自行解决多进程的问题
示例代码

import multiprocessing as mp

def job(x):
return x*x

# 定义一个pool
pool = mp.Pool()


# 有了池子之后,就可以让池子对应某一个函数,我们向池子里丢数据,池子就会返回函数返回的值。Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。

# 接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核
res = pool.map(job, range(10))
print(res)

自定义核数量

Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量。
示例代码

def multicore():
pool = mp.Pool(processes=3) # 定义CPU核数量为3
res = pool.map(job, range(10))
print(res)

apply_async

示例代码

import multiprocessing as mp

def job(x):
return x*x

pool = mp.Pool()
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
print([res.get() for res in multi_res])

# 和map差不多

共享内存

我们可以通过使用Value数据存储在一个共享的内存表中。

import multiprocessing as mp

value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)

其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型。更多的形式请查看此表.

| Type code | C Type             | Python Type       | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'` | signed char | int | 1 |
| `'B'` | unsigned char | int | 1 |
| `'u'` | Py_UNICODE | Unicode character | 2 |
| `'h'` | signed short | int | 2 |
| `'H'` | unsigned short | int | 2 |
| `'i'` | signed int | int | 2 |
| `'I'` | unsigned int | int | 2 |
| `'l'` | signed long | int | 4 |
| `'L'` | unsigned long | int | 4 |
| `'q'` | signed long long | int | 8 |
| `'Q'` | unsigned long long | int | 8 |
| `'f'` | float | float | 4 |
| `'d'` | double | float | 8 |

在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据。

array = mp.Array('i', [1, 2, 3, 4])

这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。

进程锁

为了解决不同进程抢共享资源的问题,我们可以用加进程锁来解决。
示例代码

import multiprocessing as mp
import time

def job(v, num, l):
l.acquire() # 锁住
for _ in range(5):
time.sleep(0.1)
v.value += num # 获取共享内存
print(v.value)
l.release() # 释放

def multicore():
l = mp.Lock() # 定义一个进程锁
v = mp.Value('i', 0) # 定义共享内存
p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入
p2 = mp.Process(target=job, args=(v,3,l))
p1.start()
p2.start()
p1.join()
p2.join()

if __name__ == '__main__':
multicore()

"""
1
2
3
4
5
8
11
14
17
20
"""
# 显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行

Author: pangzibo243
Link: https://litianbo243.github.io/2019/08/05/python 多线程、多进程/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.
支付宝打赏
微信打赏